package com.atguigu.day09;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Example9 {
    public static void main(String[] args) {
        var env = StreamExecutionEnvironment.getExecutionEnvironment();
        var streamTableEnv = StreamTableEnvironment
                .create(env, EnvironmentSettings.newInstance().inStreamingMode().build());

        // 创建一张输入动态表
        streamTableEnv
                .executeSql(
                        "CREATE TABLE clicks (`username` STRING, `url` STRING)" +
                                " WITH (" +
                                "   'connector' = 'filesystem'," +
                                "   'path' = '/home/yuantuzhi/flinktutorial0905/src/main/resources/file.csv'," +
                                "   'format' = 'csv')"
                );

        // 创建一张输出动态表
        streamTableEnv
                .executeSql(
                        "CREATE TABLE result_table (`username` STRING, `cnt` BIGINT)" +
                                "  WITH ('connector' = 'print')"
                );

        // 将输入表的查询结果写入输出表
        streamTableEnv
                .executeSql(
                        "INSERT INTO result_table" +
                                "  SELECT `username`, COUNT(`username`) as `cnt` FROM clicks GROUP BY `username`"
                );
    }
}
